/*
 *  Filename:lsv_log.c
 *  Description:
 *
 *  Created on: 2017年3月2日
 *  Author: Asdf(825674301)
 */

#include "list.h"
#include "lsv_log.h"
#include "lsv_help.h"
#include "lsv_volume.h"

extern uint32_t lsv_feature;

static int lsv_log_delete(lsv_volume_proto_t *lsv_info) {
        lsv_log_info_t *log_info = lsv_info->log_info;
        if (log_info) {
                if (log_info->gc_context) {
                        lsv_gc_delete(lsv_info);
                }
                free(log_info);
        }
        lsv_info->log_info = NULL;
        return 0;
}

int lsv_log_init(lsv_volume_proto_t *lsv_info, lsv_u32_t flag) {
        YASSERT(LSV_LOG_HEAD_AREA >= sizeof(lsv_log_head_t));
        YASSERT(LSV_LOG_HLOG_SIZE == sizeof(lsv_log_hlog_t));
        YASSERT(LSV_LOG_SLOG_SIZE == sizeof(lsv_log_slog_t));
        YASSERT(LSV_LOG_PAGE_NUM == 255);
        YASSERT(LSV_CHUNK_SIZE== sizeof(lsv_gc_old_storage_record_t));

        int ret = 0;
        lsv_log_info_t *log_info = NULL;

        lsv_info->log_info = NULL;
        {
                ret = ymalloc((void **)&log_info, sizeof(lsv_log_info_t));
                if (unlikely(ret)) {
                        ret = -ENOMEM;
                        DERROR("malloc log_info failed, ret %d\n", ret);
                        GOTO(ERR0, ret);
                }
                log_info->gc_context = NULL;

        }
        lsv_info->log_info = log_info;

        if (LSV_SYS_CREATE == flag) {
                ret = lsv_gc_create(lsv_info);
        } else if (LSV_SYS_LOAD == flag) {
                ret = lsv_gc_load(lsv_info);
        } else if (LSV_SYS_RECOVERY == flag) {
                ret = lsv_gc_recovery(lsv_info);
        } else {
                YASSERT(0);
        }
        if (unlikely(ret)) {
                GOTO(ERR1, ret);
        }

        ret = lsv_gc_fullgc_start_timer(lsv_info);
        if (unlikely(ret)) {
                GOTO(ERR1, ret);
        }

        return 0;
ERR1:
        lsv_log_delete(lsv_info);
ERR0:
        return ret;
}

int lsv_log_destroy(lsv_volume_proto_t* lsv_info) {

        lsv_gc_delete(lsv_info);
        return 0;
}

int lsv_log_flush(lsv_volume_proto_t *lsv_info) {

        // TODO stop fullgc

        // TODO flush task_queue

        lsv_log_info_t *log_info = lsv_info->log_info;
        if (log_info) {
                if (log_info->gc_context) {
                        lsv_gc_flush(lsv_info);
                }
        }

        return 0;
}

int lsv_log_setattr(lsv_volume_proto_t *lsv_info) {
        int ret = 0;

        ret = lsv_gc_setattr(lsv_info);
        if (unlikely(ret)) {
                DINFO("setattr log gc error:%d\n", ret);
                goto ERR;
        }

ERR:
        return ret;
}

static int _update_bitmap_directly(lsv_volume_proto_t *lsv_info, lsv_u32_t log_chkid, const lsv_log_proto_t *log_proto,
                int begin, int end) {
        int ret;

        if (begin == -1 || end == -1)
                return 0;

        YASSERT(begin <= end);

        DINFO_NOP("update_bitmap_directly, chunk_id:%u,<%d,%d>\n", log_chkid, begin, end);

        if (begin == end) {
                lsv_bitmap_unit_t bunit;
                lsv_bitmap_unit_init(&bunit, log_chkid, (uint32_t) lsv_log_slogpos2chkoffset(begin), LSV_THIS_VOL_INO);
                DINFO_NOP("update_bitmap <%llu,%u>\t<%u,%u>\n",
                      (LLU )log_proto->hlog[begin].lba,
                      log_proto->hlog[begin].crc,
                      bunit.chunk_id,
                      bunit.chunk_page_off);
                ret = lsv_bitmap_paged_write(lsv_info, log_proto->hlog[begin].lba, &bunit);
                // lsv_info->bitmap_write_count++;
        } else {
                int page_idx, n = end - begin + 1;
                lsv_bitmap_unit_t *bitmap_buf = malloc(sizeof(lsv_bitmap_unit_t) * n);
                for (int i = 0; i < n; i++) {
                        page_idx = begin + i;
                        lsv_bitmap_unit_init(&bitmap_buf[i],
                                             log_chkid,
                                             (uint32_t) lsv_log_slogpos2chkoffset(page_idx),
                                             LSV_THIS_VOL_INO);
                        DINFO_NOP("update_bitmap2 <%llu,%u>\t<%u,%u> %d/%d\n",
                              (LLU)log_proto->hlog[page_idx].lba,
                              log_proto->hlog[page_idx].crc,
                              bitmap_buf[i].chunk_id,
                              bitmap_buf[i].chunk_page_off,
                              i,
                              n);
                }
                ret = lsv_bitmap_batch_write(lsv_info, log_proto->hlog[begin].lba, (uint32_t) n * LSV_PAGE_SIZE,
                                bitmap_buf);
                // lsv_info->bitmap_batch_write_count++;
                if (bitmap_buf) {
                        free(bitmap_buf);
                }
        }
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int update_bitmap(lsv_volume_proto_t *lsv_info, lsv_u32_t log_chkid, const lsv_log_proto_t *log_proto) {
        int ret = 0, update_count = 0;
        int begin = -1, end = -1;

        struct timeval t1, t2;
        _gettimeofday(&t1, NULL);

        if (lsv_feature & LSV_FEATURE_BITMAP_GROUP_COMMIT) {
                ret = lsv_bitmap_start_session(lsv_info);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }
        
        for (int i = 0; i < log_proto->head->page_count; i++) {
                lsv_bitmap_paged_pre_write(lsv_info, log_proto->hlog[i].lba);
        }
        
        for (int i = 0; i < log_proto->head->page_count; i++) {
                DINFO("refmap write %u/%u lba %llu crc %u chunk_id %u page %u snap_id %u\n",
                      i,
                      log_proto->head->page_count,
                      (LLU)log_proto->hlog[i].lba,
                      log_proto->hlog[i].crc,
                      log_chkid,
                      i,
                      log_proto->hlog[i].snap_id);
#ifdef __PAGE_CRC32__
                YASSERT(log_proto->hlog[i].crc == page_crc32(log_proto->slog[i].page));
#endif
                if (i == 0) {
                        begin = i;
                        end = i;
                } else {
                        if (log_proto->hlog[i].lba == log_proto->hlog[i - 1].lba + LSV_PAGE_SIZE) {
                                end++;
                        } else {
                                //
                                ret = _update_bitmap_directly(lsv_info, log_chkid, log_proto, begin, end);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);

                                update_count++;
                                begin = i;
                                end = i;
                        }
                }
        }

        ret = _update_bitmap_directly(lsv_info, log_chkid, log_proto, begin, end);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (lsv_feature & LSV_FEATURE_BITMAP_GROUP_COMMIT) {
                ret = lsv_bitmap_commit_dirty(lsv_info);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }
        
        update_count++;
        _gettimeofday(&t2, NULL);
        int64_t used = _time_used(&t1, &t2);

        lsv_info->op_stat[WRITE_BITMAP].count += 1;
        lsv_info->op_stat[WRITE_BITMAP].used += used;

        DERROR("update_bitmap3 chunk_id %u page %u count %u time %llu %llu\n",
               log_chkid,
               log_proto->head->page_count,
               update_count,
               (LLU)used,
               (LLU)used/update_count);

        return 0;
err_ret:
        return ret;
}

/**
 * @param lsv_info
 * @param log_buf
 * @return
 */
int lsv_log_write(lsv_volume_proto_t *lsv_info, void* log_buf) {
        int ret = 0;
        lsv_u32_t log_chkid;
        lsv_log_proto_t log_proto;

        log_proto.log = log_buf;
        lsv_log_proto_link(&log_proto);
        YASSERT(log_proto.head->page_count<=LSV_LOG_PAGE_NUM);

        DINFO_NOP("log_io_arg page_count>%u\n", log_proto.head->page_count);

        // write log
        log_chkid = LSV_CHUNK_NULL_ID;
        if (lsv_feature & LSV_FEATURE_LOG) {
                ret = lsv_volume_chunk_write(lsv_info, log_proto.log,
                                             LSV_LOG_LOG_STORAGE_TYPE, &log_chkid);
                if (unlikely(ret)) {
                        DERROR("volume write err,errno:%d\n", ret);
                        goto ERR0;
                }

                // ERR1

                log_proto.chunk_id = log_chkid;
                DBUG("write_log chunk_id:%u\n", log_chkid);
        }
        lsv_info->log_commit_count++;

        // update bitmap
        if (lsv_feature & LSV_FEATURE_BITMAP) {
                ret = update_bitmap(lsv_info, log_chkid, &log_proto);
                if (unlikely(ret))
                        GOTO(ERR1, ret);
        }

        ret = lsv_gc_add(lsv_info, &log_proto);
        if (unlikely(ret))
                GOTO(ERR1, ret);

        return 0;
ERR1:
        if ( LSV_CHUNK_NULL_ID != log_chkid) {
                lsv_volume_chunk_free(lsv_info, LSV_LOG_LOG_STORAGE_TYPE, log_chkid);
        }
ERR0:
        return ret;
}

int lsv_log_read(lsv_volume_proto_t *lsv_info, uint32_t chunk_id, lsv_s8_t *buf) {
        int ret = 0;

        ret = lsv_volume_chunk_read(lsv_info, chunk_id, buf);
        if (unlikely(ret)) {
                DERROR("volume read err,errno:%d,chunk_id %u\n", ret, chunk_id);
                goto ERR;
        }

        DINFO("read log chunk_id:%u\n", chunk_id);
        lsv_log_check(chunk_id, buf);

ERR:
        return ret;
}

int lsv_log_check(uint32_t chunk_id, lsv_s8_t *buf) {

#if CHECK_CRC
        lsv_log_proto_t log_proto;
        log_proto.log = buf;
        lsv_log_proto_link(&log_proto);

        YASSERT(log_proto.head->page_count >= 0 && log_proto.head->page_count<=LSV_LOG_PAGE_NUM);

        for (int i=0; i < log_proto.head->page_count; i++) {
                DINFO("refmap read %u/%u lba %llu chunk_id %u page %u crc %u snap_id %u\n",
                      i,
                      log_proto.head->page_count,
                      (LLU)log_proto.hlog[i].lba,
                      chunk_id,
                      i,
                      log_proto.hlog[i].crc,
                      log_proto.hlog[i].snap_id);
#ifdef __PAGE_CRC32__
                uint32_t crc = page_crc32(log_proto.slog[i].page);
                YASSERT(log_proto.hlog[i].crc == crc);
#endif
        }
#else
        (void)chunk_id;
        (void)buf;
#endif

        return 0;
}
